use crate::physical_plan::{PhysicalPlanExpr, PhysicalPlanExprArg};
use crate::run_plan::{HandResult, HandResultList};
use crate::data_frame::{DataInstance, value::RealValue};
use async_recursion::async_recursion;
use crate::data_frame::single_data::SingleData;
use crate::physical_plan::window::WindowElement;

#[derive(Debug,Clone)]
pub enum RunPlanPhysicalPlanFuncArg{
    Normal(Box<PhysicalPlanExprArg>),
    OtherRunPlan(Box<RunPlanExprFunc>),
}

#[derive(Debug,Clone)]
pub struct RunPlanExprFunc {
    physical_plan: Box<PhysicalPlanExpr>,
    args: Vec<RunPlanPhysicalPlanFuncArg>,
}

#[derive(Debug)]
pub enum RunPlanExprFuncComb{
    Single(Box<RunPlanExprFunc>),
    And(Box<RunPlanExprFuncComb>,Box<RunPlanExprFuncComb>),
    Or(Box<RunPlanExprFuncComb>,Box<RunPlanExprFuncComb>),
}

#[derive(Debug)]
pub struct RunPlanExprFuncCombWithName{
    pub name: String,
    pub func_comb: Box<RunPlanExprFuncComb>,
}

impl RunPlanExprFunc {
    #[async_recursion]
    pub async fn get_arg_list(&mut self, data: &mut SingleData)->Option<Vec<Box<PhysicalPlanExprArg>>>{
        let mut one_func_arg = Vec::new();
        let mut arg_has_fail = false;

        for arg in self.args.iter_mut(){
            match arg{
                RunPlanPhysicalPlanFuncArg::Normal(arg) => {
                    one_func_arg.push(arg.clone());
                },
                RunPlanPhysicalPlanFuncArg::OtherRunPlan(ref mut func) => {
                    match func.evaluate_single(data).await{
                        HandResult::Ok(r) => {
                            one_func_arg.push(PhysicalPlanExprArg::from_real_value(r));
                        },
                        HandResult::Err(_) => {
                            arg_has_fail = true;
                            break;
                        },
                    }
                },
            }
        }

        if arg_has_fail{
            return None;
        }else{
            return Some(one_func_arg);
        }
    }
    #[async_recursion]
    pub async fn evaluate(&mut self, data: &mut DataInstance)-> HandResultList {

        match data{
            DataInstance::BatchData(batch_data) => {
                let mut arg_list = Vec::new();

                for data in batch_data.iter_mut(){
                    arg_list.push(self.get_arg_list(data).await);
                }

                let mut hand_list = Vec::new();
                for (idx,data) in batch_data.iter_mut().enumerate(){
                    let arg = &arg_list[idx];
                    if let Some(arg) = arg{
                        hand_list.push(self.physical_plan.evaluate(arg.as_slice(), data).await);
                    }else{
                        hand_list.push(HandResult::Err(()));
                    }
                }
                return hand_list;
            },
            DataInstance::GroupData(group_data) => {
                let mut arg_list = Vec::new();

                for data in group_data.iter_mut(){
                    arg_list.push(self.get_arg_list(data.index_value_mut()).await);
                }

                let mut hand_list = Vec::new();
                for (idx,data) in group_data.iter_mut().enumerate(){
                    let arg = &arg_list[idx];
                    if let Some(arg) = arg{
                        hand_list.push(self.physical_plan.evaluate(arg.as_slice(), data.index_value_mut()).await);
                    }else{
                        hand_list.push(HandResult::Err(()));
                    }
                }
                return hand_list;
            },
        }
    }
    #[async_recursion]
    pub async fn evaluate_single(&mut self, single_data: &mut SingleData) -> HandResult{
        let mut args = Vec::new();

        for arg in self.args.iter_mut(){
            match arg{
                RunPlanPhysicalPlanFuncArg::Normal(arg) => {
                    args.push(arg.clone());
                },
                RunPlanPhysicalPlanFuncArg::OtherRunPlan(ref mut func) => {
                    let arg = match func.evaluate_single(single_data).await{
                        HandResult::Ok(t) => {t},
                        HandResult::Err(_e) => {return HandResult::Err(());},
                    };

                    args.push(PhysicalPlanExprArg::from_real_value(arg));
                },
            }
        }

        return self.physical_plan.evaluate(args.as_slice(), single_data).await;
    }
}

impl RunPlanExprFuncCombWithName{
    #[async_recursion]
    pub async fn evaluate(&mut self, data: &mut DataInstance)->HandResultList{
        self.func_comb.evaluate(data).await
    }
}

impl RunPlanExprFuncComb{
    #[async_recursion]
    pub async fn evaluate(&mut self, data: &mut DataInstance)-> HandResultList {
        match self{
            RunPlanExprFuncComb::Single(f) => {return f.evaluate(data).await;},
            RunPlanExprFuncComb::And(left,right) => {
                let left_result = left.evaluate(data).await;
                let right_result = right.evaluate(data).await;

                assert_eq!(left_result.len(), right_result.len());

                let mut result = Vec::new();
                for (idx,left_result) in left_result.iter().enumerate(){
                    if let HandResult::Ok(left_result) = left_result{
                        if let RealValue::Boolean(left_result) = left_result{
                            if let HandResult::Ok(right_result) = &right_result[idx]{
                                if let RealValue::Boolean(right_result) = right_result{
                                    result.push(HandResult::Ok(RealValue::Boolean(left_result & right_result)));
                                }else{
                                    result.push(HandResult::Err(()));
                                }
                            }else{
                                result.push(HandResult::Err(()));
                            }
                        }else{
                            result.push(HandResult::Err(()));
                        }
                    }else{
                        result.push(HandResult::Err(()));
                    }
                }

                return result;
            },
            RunPlanExprFuncComb::Or(left,right) => {
                let left_result = left.evaluate(data).await;
                let right_result = right.evaluate(data).await;

                assert_eq!(left_result.len(), right_result.len());

                let mut result = Vec::new();
                for (idx,left_result) in left_result.iter().enumerate(){
                    if let HandResult::Ok(left_result) = left_result{
                        if let RealValue::Boolean(left_result) = left_result{
                            if *left_result{
                                result.push(HandResult::Ok(RealValue::Boolean(true)));
                            }else{
                                if let HandResult::Ok(right_result) = &right_result[idx]{
                                    if let RealValue::Boolean(right_result) = right_result{
                                        result.push(HandResult::Ok(RealValue::Boolean(*right_result)));
                                    }else{
                                        result.push(HandResult::Err(()));
                                    }
                                }else{
                                    result.push(HandResult::Err(()));
                                }
                            }
                        }else{
                            result.push(HandResult::Err(()));
                        }
                    }else{
                        result.push(HandResult::Err(()));
                    }
                }

                return result;
            },
        }
    }
}

impl RunPlanExprFunc {
    pub fn new(physical_plan: Box<PhysicalPlanExpr>, args: Vec<RunPlanPhysicalPlanFuncArg>)
               ->Box<RunPlanExprFunc>{
        Box::new(RunPlanExprFunc {
            physical_plan: physical_plan,
            args: args,
        })
    }
}
